/*** Eclipse Class Decompiler plugin, copyright (c) 2012 Chao Chen (cnfree2000@hotmail.com) ***/
package com.sankuai.xm.kafka.client.factory;

import com.sankuai.xm.kafka.client.IMessageListener;
import com.sankuai.xm.kafka.client.utils.StackTraceUtil;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerMsgTask implements Runnable {
	private static final Logger log = LoggerFactory
			.getLogger(ConsumerMsgTask.class);
	private KafkaStream stream;
	private IMessageListener msgListener;

	public ConsumerMsgTask(KafkaStream stream, IMessageListener msgListener) {
		this.stream = stream;
		this.msgListener = msgListener;
	}

	public void run() {
		ConsumerIterator it = this.stream.iterator();
		try {
			while (it.hasNext()) {
				byte[] array = (byte[]) it.next().message();
				this.msgListener.recvMessage(array);
			}
		} catch (Exception e) {
			log.error("xm-kafka-client, recv.msg occur exception = {}.",
					StackTraceUtil.getStackTrace(e));
		}
	}
}